package com.rem.flink.flink2Source;


import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 自定义一个生产数据源 1s/次
 *
 * @author Rem
 * @date 2022/10/9
 */
public class ClickSource implements SourceFunction<Event> {
    /**
     * 声明一个布尔变量，作为控制数据生成的标识位
     */
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();
        String[] users = {"Mary", "Alice", "Bob", "张三", "李四"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

//        Calendar.getInstance().getTimeInMillis()
        Long a = 1000L;
        while (running) {
            a += 1000L;
            ctx.collect(new Event(users[random.nextInt(users.length)], urls[random.nextInt(urls.length)], a));
            // 隔1秒生成一个点击事件，方便观测

            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomLong(100, 600));
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

}
